package com.flow.framework.mq.consumer.listener;

import com.flow.framework.common.constant.FrameworkCommonConstant;
import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.json.JsonObject;
import com.flow.framework.common.type.TypeReference;
import com.flow.framework.common.util.date.DateUtil;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.constant.FrameworkCoreConstant;
import com.flow.framework.core.holder.SecurityContextHolder;
import com.flow.framework.core.holder.SystemVersionContextHolder;
import com.flow.framework.core.pojo.dto.base.notify.BaseNotifyDto;
import com.flow.framework.core.properties.FrameworkCoreConfigProperties;
import com.flow.framework.core.system.helper.AsyncHelper;
import com.flow.framework.core.system.listener.lifecycle.ISystemLifecycleListener;
import com.flow.framework.facade.mq.module.service.IMqFrameworkModuleService;
import com.flow.framework.facade.mq.pojo.dto.MessageModuleDto;
import com.flow.framework.facade.mq.pojo.vo.RecordMessageModuleVo;
import com.flow.framework.mq.constant.FrameworkMqConstant;
import com.flow.framework.mq.consumer.IConsumer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * MQ消费者监听器
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/3/26
 */
@Slf4j
public class MessageQueueConsumerListener implements MessageListener, ISystemLifecycleListener {

    /**
     * routingKey = bizCode + FrameworkMqConstant.MQ_UNDERLINE + handleMsgServiceCode
     * 所以该属性为routingKey和consumer的map映射
     */
    private final Map<String, IConsumer<? extends BaseNotifyDto>> routingKeyAndConsumerMap = new HashMap<>();

    private FrameworkCoreConfigProperties frameworkCoreConfigProperties;

    private IMqFrameworkModuleService mqFrameworkModuleService;

    private boolean isRunning = true;

    /**
     * 创建消费者监听器
     *
     * @param routingKeyAndConsumerMap      routingKey和consumer的map映射
     * @param frameworkCoreConfigProperties 框架核心配置
     * @param mqFrameworkModuleService      mq框架服务
     */
    public MessageQueueConsumerListener(Map<String, IConsumer<? extends BaseNotifyDto>> routingKeyAndConsumerMap,
                                        FrameworkCoreConfigProperties frameworkCoreConfigProperties,
                                        IMqFrameworkModuleService mqFrameworkModuleService) {
        if (!VerifyUtil.isEmpty(routingKeyAndConsumerMap)) {
            this.routingKeyAndConsumerMap.putAll(routingKeyAndConsumerMap);
        }
        this.frameworkCoreConfigProperties = frameworkCoreConfigProperties;
        this.mqFrameworkModuleService = mqFrameworkModuleService;
    }

    /**
     * @inheritDoc
     */
    @Override
    @SuppressWarnings("unchecked")
    public void onMessage(Message message) {
        MDC.clear();
        SystemVersionContextHolder.clear();
        SecurityContextHolder.clearAll();
        try {
            if (!isRunning) {
                log.error("service will shutdown");
                throw new CheckedException(SystemErrorCode.SERVICE_WILL_SHUTDOWN_ERROR);
            }
            MessageProperties messageProperties = message.getMessageProperties();
            String messageId = messageProperties.getMessageId();
            Map<String, Object> headers = messageProperties.getHeaders();

            // 处理trace id等脚手架相关的参数
            preprocess(headers);
            String routingKey = messageProperties.getReceivedRoutingKey();
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            IConsumer consumer = routingKeyAndConsumerMap.get(routingKey);
            VerifyUtil.requireNotNull(consumer, () -> {
                log.error("can't find consumer. route key : {}", routingKey);
                return new CheckedException(SystemErrorCode.MQ_ERROR, "can't find consumer");
            });
            TypeReference<? extends BaseNotifyDto> messageType = consumer.getMessageType();
            VerifyUtil.requireNotNull(messageType, () -> {
                log.error("can't find message type. route key : {}", routingKey);
                return new CheckedException(SystemErrorCode.MQ_ERROR, "can't find message type");
            });
            try {
                BaseNotifyDto dto = JsonObject.toBean(messageBody, messageType);
                consumer.onMessage(messageId, headers, dto);
            } catch (Exception e) {
                log.error("consume msg error.", e);
                boolean reachedMaxRetry = recordAndCheckReachedMaxRetry(headers, messageId, messageBody, messageProperties);
                if (!reachedMaxRetry) {
                    throw e;
                }
                log.info("record handle failed msg success, route key: {}, message id: {}", routingKey, messageId);
            }
        } finally {
            SystemVersionContextHolder.clear();
            SecurityContextHolder.clearAll();
            MDC.clear();
        }
    }

    private boolean recordAndCheckReachedMaxRetry(Map<String, Object> headers, String messageId, String messageBody,
                                                  MessageProperties messageProperties) {
        try {
            String routingKey = messageProperties.getReceivedRoutingKey();
            MessageModuleDto messageModuleDto = new MessageModuleDto();
            Map<String, String> metadata = new HashMap<>(16);
            metadata.put("type", "rabbit_mq");
            metadata.put("exchange", messageProperties.getReceivedExchange());
            metadata.put("routingKey", routingKey);
            metadata.put("queue", messageProperties.getConsumerQueue());
            messageModuleDto.setMetadata(metadata);

            messageModuleDto.setSystemVersion(SystemVersionContextHolder.getCurrentSystemVersion());
            messageModuleDto.setTenantId(SecurityContextHolder.getTenantIdQuietly());
            messageModuleDto.setTraceId(MDC.get(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY));
            messageModuleDto.setBizDateTime(
                    DateUtil.parseLocalDateTime((String) headers.get(FrameworkMqConstant.MQ_SEND_DATE_TIME_KEY))
            );
            messageModuleDto.setMessageId(messageId);
            messageModuleDto.setMessageBody(messageBody);
            messageModuleDto.setMessageHeaders(messageProperties.getHeaders());
            RecordMessageModuleVo recordMessageModuleVo = mqFrameworkModuleService.recordHandleFailedMsg(messageModuleDto);
            return recordMessageModuleVo.isReachedMaxRetry();
        } catch (Exception e) {
            log.error("record handle failed msg error. message id : {}", messageId, e);
        }
        return false;
    }

    private void preprocess(Map<String, Object> headers) {
        String traceId;
        if (!VerifyUtil.isEmpty(headers)) {
            Object traceIdTemp = headers.get(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY);
            if (!VerifyUtil.isEmpty(traceIdTemp)) {
                traceId = String.valueOf(traceIdTemp);
            } else {
                traceId = AsyncHelper.randomAsyncTraceId();
            }
            MDC.put(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY, traceId);

            Object systemVersion = headers.get(FrameworkCommonConstant.SYSTEM_VERSION_KEY);
            if (!VerifyUtil.isEmpty(systemVersion)) {
                SystemVersionContextHolder.setCurrentSystemVersion(Long.parseLong(String.valueOf(systemVersion)));
            } else {
                SystemVersionContextHolder.clear();
            }

            Object tenantIdTemp = headers.get(FrameworkCoreConstant.BIZ_TENANT_ID_HEADER_KEY);
            if (!VerifyUtil.isEmpty(tenantIdTemp)) {
                SecurityContextHolder.setTenantId(String.valueOf(tenantIdTemp));
            } else {
                if (null != frameworkCoreConfigProperties && frameworkCoreConfigProperties.isEnableTenantSupport()) {
                    log.error("tenant is is empty");
                    throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
                }
            }
        } else {
            traceId = AsyncHelper.randomAsyncTraceId();
            MDC.put(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY, traceId);
        }
    }

    /**
     * @inheritDoc
     */
    @Override
    public void containerAckMode(AcknowledgeMode mode) {

    }

    /**
     * @inheritDoc
     */
    @Override
    public void onMessageBatch(List<Message> messages) {
        log.error("batch consume not support !!!!!!!!!!!!!!!");
        throw new CheckedException(SystemErrorCode.MQ_ERROR, "batch consume not support");
    }

    /**
     * @inheritDoc
     */
    @Override
    public void beforeShutdown() {
        isRunning = false;
    }
}
